package com.sophie;

import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.logging.Logger;

@RestController
@RequestMapping
@SpringBootApplication
public class KafkaLeamApplication {


    private static final Logger log = (Logger) LoggerFactory.getLogger(KafkaLeamApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaLeamApplication.class, args);
    }


    @Autowired
    private KafkaTemplate template;

    private static final String topic = "xixi";

    @RequestMapping("/index")
    public String index() {
        return "hello,xxx";
    }

    /**
     * 事务1
     *
     * @param input
     * @return
     */
    @GetMapping("sendToKafka/{input}")
    public String sendToKafka(@PathVariable String input) {
        this.template.send(topic, input);
        // 事务的支持
        template.executeInTransaction(t -> {
            t.send(topic, input);
            if ("error".equals(input)) {
                throw new RuntimeException("input is error");
            }
            t.send(topic, input + " author");
            return true;
        });
        return "send success:" + input;
    }

    /**
     * 事务2
     *
     * @param input
     * @return
     */
    @GetMapping("sendToKafka2/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String sendToKafka2(@PathVariable String input) {
        // this.template.send(topic, input);
        // 事务的支持
        template.send(topic, input);
        if ("error".equals(input)) {
            throw new RuntimeException("input is error");
        }
        template.send(topic, input + " author");
        return "send success:" + input;
    }

    @KafkaListener(id = "", topics = topic, groupId = "group.demo")
    public void listener(String input) {
        log.info("message input value " + input);
    }
}
